package com.ld.shieldsb.canalclient.client;

import java.io.UnsupportedEncodingException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.Pair;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionBegin;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionEnd;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import com.ld.shieldsb.canalclient.handler.ICanalDataSyncHandler;
import com.ld.shieldsb.canalclient.handler.config.CanalClientConfig;
import com.ld.shieldsb.canalclient.model.SyncSwitch;
import com.ld.shieldsb.canalclient.recoder.Recorder;
import com.ld.shieldsb.canalclient.recoder.RecorderHandler;
import com.ld.shieldsb.common.core.collections.ListUtils;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Data
public class BaseCanalClient {

    protected static final String SEP = SystemUtils.LINE_SEPARATOR;
    protected static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    protected volatile boolean running = false;
    // 线程异常处理器
    protected Thread.UncaughtExceptionHandler handler = (t, e) -> log.error("parse events has an error", e);
    protected Thread thread = null; // 处理线程
    protected CanalConnector connector;
    protected static String context_format = null;
    protected static String row_format = null;
    protected static String transaction_format = null;

    protected String name; // 名称
    protected String destination;
    protected String uniqueKey; // 唯一键，可用于存放对应model的id或uuid

    protected List<ICanalDataSyncHandler> handlers = null; // 处理器，可能有多个
    protected RecorderHandler recorderHandler = null; // 记录器

    public static final Integer RUNNING_SUCCESS = 1; // 正常
    public static final Integer RUNNING_ERROR = 2; // 异常
    public static final Integer RUNNING_OFF = 3; // 关闭数据同步

    protected SyncSwitch syncSwitch; // 开关
    protected CanalClientConfig canalClientConfig; // 配置参数

    protected Integer runningState = RUNNING_SUCCESS;

    static {
        context_format = SEP + "****************************************************" + SEP;
        context_format += "* Batch Id: [{}] ,count : [{}] , memsize : [{}] , Time : {}" + SEP;
        context_format += "* Start : [{}] " + SEP;
        context_format += "* End : [{}] " + SEP;
        context_format += "****************************************************" + SEP;

        row_format = SEP
                + "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {}({}) , gtid : ({}) , delay : {} ms"
                + SEP;

        transaction_format = SEP + "================> binlog[{}:{}] , executeTime : {}({}) , gtid : ({}) , delay : {}ms" + SEP;

    }

    /**
     * 打印概要信息
     * 
     * @Title printSummary
     * @author 吕凯
     * @date 2021年11月13日 下午3:36:45
     * @param message
     * @param batchId
     * @param size
     *            void
     */
    protected void printSummary(Message message, long batchId, int size) {
        long memsize = 0;
        for (Entry entry : message.getEntries()) {
            memsize += entry.getHeader().getEventLength();
        }

        String startPosition = null;
        String endPosition = null;
        if (!CollectionUtils.isEmpty(message.getEntries())) {
            startPosition = buildPositionForDump(message.getEntries().get(0));
            endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1));
        }

        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
        log.info(context_format, new Object[] { batchId, size, memsize, format.format(new Date()), startPosition, endPosition });
    }

    protected String buildPositionForDump(Entry entry) {
        long time = entry.getHeader().getExecuteTime();
        Date date = new Date(time);
        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
        String position = entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":"
                + entry.getHeader().getExecuteTime() + "(" + format.format(date) + ")";
        if (StringUtils.isNotEmpty(entry.getHeader().getGtid())) {
            position += " gtid(" + entry.getHeader().getGtid() + ")";
        }
        return position;
    }

    /**
     * 打印实体信息
     * 
     * @Title printEntry
     * @author 吕凯
     * @date 2021年11月13日 下午3:36:03
     * @param entrys
     *            void
     */
    protected void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            long executeTime = entry.getHeader().getExecuteTime();
            long delayTime = new Date().getTime() - executeTime;
            Date date = new Date(entry.getHeader().getExecuteTime());
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
                    TransactionBegin begin = null;
                    try {
                        begin = TransactionBegin.parseFrom(entry.getStoreValue());
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                    }
                    // 打印事务头信息，执行的线程id，事务耗时
                    log.info(transaction_format,
                            new Object[] { entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()),
                                    String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
                                    entry.getHeader().getGtid(), String.valueOf(delayTime) });
                    log.info(" BEGIN ----> Thread id: {}", begin.getThreadId());
                    printXAInfo(begin.getPropsList());
                } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
                    TransactionEnd end = null;
                    try {
                        end = TransactionEnd.parseFrom(entry.getStoreValue());
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                    }
                    // 打印事务提交信息，事务id
                    log.info("----------------\n");
                    log.info(" END ----> transaction id: {}", end.getTransactionId());
                    printXAInfo(end.getPropsList());
                    log.info(transaction_format,
                            new Object[] { entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()),
                                    String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
                                    entry.getHeader().getGtid(), String.valueOf(delayTime) });
                }

                continue;
            }

            if (entry.getEntryType() == EntryType.ROWDATA) {
                RowChange rowChange = null;
                try {
                    rowChange = RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                }

                EventType eventType = rowChange.getEventType();

                log.info(row_format,
                        new Object[] { entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()),
                                entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType,
                                String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
                                entry.getHeader().getGtid(), String.valueOf(delayTime) });

                if (eventType == EventType.QUERY || rowChange.getIsDdl()) {
                    log.info("ddl : " + rowChange.getIsDdl() + " ,  sql ----> " + rowChange.getSql() + SEP);
                    continue;
                }

                printXAInfo(rowChange.getPropsList());
                for (RowData rowData : rowChange.getRowDatasList()) {
                    if (eventType == EventType.DELETE) {
                        printColumn(rowData.getBeforeColumnsList());
                    } else if (eventType == EventType.INSERT) {
                        printColumn(rowData.getAfterColumnsList());
                    } else {
                        printColumn(rowData.getAfterColumnsList());
                    }
                }
            }
        }
    }

    /**
     * 打印列信息
     * 
     * @Title printColumn
     * @author 吕凯
     * @date 2021年11月13日 下午3:37:06
     * @param columns
     *            void
     */
    protected void printColumn(List<Column> columns) {
        for (Column column : columns) {
            StringBuilder builder = new StringBuilder();
            try {
                if (StringUtils.containsIgnoreCase(column.getMysqlType(), "BLOB")
                        || StringUtils.containsIgnoreCase(column.getMysqlType(), "BINARY")) {
                    // get value bytes
                    builder.append(column.getName() + " : " + new String(column.getValue().getBytes("ISO-8859-1"), "UTF-8"));
                } else {
                    builder.append(column.getName() + " : " + column.getValue());
                }
            } catch (UnsupportedEncodingException e) {
            }
            builder.append("    type=" + column.getMysqlType());
            if (column.getUpdated()) {
                builder.append("    update=" + column.getUpdated());
            }
            builder.append(SEP);
            log.info(builder.toString());
        }
    }

    protected void printXAInfo(List<Pair> pairs) {
        if (pairs == null) {
            return;
        }

        String xaType = null;
        String xaXid = null;
        for (Pair pair : pairs) {
            String key = pair.getKey();
            if (StringUtils.endsWithIgnoreCase(key, "XA_TYPE")) {
                xaType = pair.getValue();
            } else if (StringUtils.endsWithIgnoreCase(key, "XA_XID")) {
                xaXid = pair.getValue();
            }
        }

        if (xaType != null && xaXid != null) {
            log.info(" ------> " + xaType + " " + xaXid);
        }
    }

    public List<ICanalDataSyncHandler> getHandlers() {
        return handlers == null ? new ArrayList<>() : handlers;
    }

    /**
     * 检查连接是否有效
     * 
     * @Title checkValid
     * @author 吕凯
     * @date 2021年12月2日 下午6:25:14
     * @return boolean
     */
    public boolean checkConnValid() {
        if (connector == null) {
            return false;
        }
        return connector.checkValid();
    }

    /**
     * 检验是否启动
     * 
     * @Title isRunning
     * @author 吕凯
     * @date 2021年12月15日 下午4:42:32
     * @return boolean
     */
    public boolean isRunning() {
        return running;
    }

    /**
     * 获取记录器
     * 
     * @Title getRecorder
     * @author 吕凯
     * @date 2021年12月16日 下午1:54:44
     * @return boolean
     */
    public Recorder getRecorder() {
        return Optional.ofNullable(recorderHandler).map(RecorderHandler::getRecorder).orElse(null);
    }

    /**
     * 添加处理器
     * 
     * @Title addHandler
     * @author 吕凯
     * @date 2021年11月13日 下午4:15:57
     * @param handler
     *            void
     */
    public void addHandler(ICanalDataSyncHandler handler) {
        if (handlers == null) {
            handlers = new ArrayList<>();
        }
        if (handler.getInit()) {
            handlers.add(handler);
            log.warn("{}[{}]注册成功！", handler.getType(), handler.getKey());
        } else {
            log.error("{}[{}]未初始化！注册失败！", handler.getType(), handler.getKey());
        }
    }

    public void addHandler(int index, ICanalDataSyncHandler handler) {
        if (handlers == null) {
            handlers = new ArrayList<>();
        }
        if (handler.getInit()) {
            handlers.add(index, handler);
            log.warn(handler.getType() + "注册成功！");
        } else {
            log.error(handler.getType() + "未初始化！注册失败！");
        }
    }

    /**
     * 获取当前Entry的 GTID信息示例
     *
     * @param header
     * @return
     */
    public static String getCurrentGtid(CanalEntry.Header header) {
        return getCurrentGtid(header, "curtGtid");
    }

    /**
     * 获取当前Entry的 GTID Sequence No信息示例
     *
     * @param header
     * @return
     */
    public static String getCurrentGtidSn(CanalEntry.Header header) {
        return getCurrentGtid(header, "curtGtidSn");
    }

    /**
     * 获取当前Entry的 GTID Last Committed信息示例
     *
     * @param header
     * @return
     */
    public static String getCurrentGtidLct(CanalEntry.Header header) {
        return getCurrentGtid(header, "curtGtidLct");
    }

    public static String getCurrentGtid(CanalEntry.Header header, String key) {
        List<CanalEntry.Pair> props = header.getPropsList();
        if (ListUtils.isNotEmpty(props)) {
            for (CanalEntry.Pair pair : props) {
                if (key.equals(pair.getKey())) {
                    return pair.getValue();
                }
            }
        }
        return "";
    }

    /**
     * 获取开关内容，如果开关关闭则阻塞直到超时
     * 
     * @Title switchGet
     * @author 吕凯
     * @date 2022年1月12日 上午11:50:37
     * @param timeout
     * @param unit
     * @throws InterruptedException
     * @throws TimeoutException
     *             void
     */
    public void switchGet(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
        if (syncSwitch != null) {
            String switchKey = getSwitchKey();
            if (syncSwitch.exists(switchKey)) {

                if (!syncSwitch.status(switchKey)) { // 开关关闭
                    setRunningState(RUNNING_OFF);
                }
                syncSwitch.get(switchKey, timeout, TimeUnit.MINUTES);
            }
        }
    }

    /**
     * 获取开关的键
     * 
     * @Title getSwitchKey
     * @author 吕凯
     * @date 2022年1月12日 下午5:47:16
     * @return String
     */
    public String getSwitchKey() {
        return "des:" + uniqueKey + "_" + destination;
    }

    /**
     * 获取开关
     * 
     * @Title switchGet
     * @author 吕凯
     * @date 2022年1月12日 上午11:51:07
     * @throws InterruptedException
     * @throws TimeoutException
     *             void
     */
    public void switchGet() throws InterruptedException, TimeoutException {
        if (syncSwitch != null) {
            String switchKey = getSwitchKey();
            if (syncSwitch.exists(switchKey)) {

                if (!syncSwitch.status(switchKey)) { // 开关关闭
                    setRunningState(RUNNING_OFF);
                }
                syncSwitch.get(switchKey);
            }
        }
    }

}
